package com.rabbitmq.origin.api;

import com.rabbitmq.client.*;
import com.rabbitmq.common.utils.RabbitMqUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

/**
 * @author
 * @Describe 消费fanout类型消息。
 * @date
 */
@Slf4j(topic = "RabbitMqConsumerFanout")
public class RabbitMqConsumerFanout {

    /**
     * 消费者1，使用临时队列存放消息并消费
     *
     * @param exchangeName
     */
    public void consumerFanout(String exchangeName, int flag) {
        Connection conn = null;
        Channel channel = null;
        try {
            // 获取连接
            conn = RabbitMqUtil.getConn();
            // 创建通道（通道可用于消息的发送和接收）
            channel = conn.createChannel();

            // 声明交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true);

            // 声明临时队列，非持久，独占，自动删除队列(客户端停止，对应的临时队列会自动删除)，每次都是一个新队列
            String queueName = channel.queueDeclare().getQueue();

            // 绑定队列,广播模式不需要路由key
            channel.queueBind(queueName, exchangeName, "");

            // 创建队列消费者
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    if (flag % 3 == 1) {
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            log.error("Thread.sleep 异常", e);
                        }
                    }
                    log.info(" consumer-" + flag + " handleDelivery " + envelope.getRoutingKey() + ":'" + message + "'");
                }
            };

            // 消息消费，自动确认
            channel.basicConsume(queueName, true, consumer);
        } catch (Exception e) {
            log.error("消费消息异常,", e);
            // 资源关闭
            RabbitMqUtil.close(conn, channel);
        }
    }

    /**
     * fanout消息队列进行消费,使用手工确认机制处理
     *
     * @param exchangeName
     */
    public void consumerFanoutManualConfirmed(String exchangeName, int flag) {
        Connection conn = null;
        final Channel channel;
        try {
            // 获取连接
            conn = RabbitMqUtil.getConn();
            // 创建通道（通道可用于消息的发送和接收）
            channel = conn.createChannel();

            // 设置服务器将传递的最大消息数为1，此时只能消费一个消息，如果没有限制则为 0
            channel.basicQos(1);

            // 声明广播类型的交换机
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true);

            // 声明临时队列
            String queueName = channel.queueDeclare().getQueue();

            // 队列绑定交换机，该模式不需要路由
            channel.queueBind(queueName, exchangeName, "", null);

            // 创建队列消费者
            String threadName = Thread.currentThread().getName();
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    if (flag == 1) {
                        try {
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            log.error("Thread.sleep 异常", e);
                        }
                    }

                    if (flag == 2) {
                        try {
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            log.error("Thread.sleep 异常", e);
                        }
                    }

                    log.info(threadName + " consumer-" + flag + " handleDelivery " + envelope.getRoutingKey() + ":'" + message + "'");

                    // 手动确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    // log.info(threadName + "手工确认完毕");
                }
            };

            // 消息消费，自动确认消息设置为false
            channel.basicConsume(queueName, false, "fanoutConsumer", consumer);
        } catch (Exception e) {
            log.error("消费消息异常,", e);
            // 资源关闭
            RabbitMqUtil.close(conn, null);
        }
    }
}
